data lake

This commit is contained in:
string 2025-10-16 08:40:28 +05:30
parent 3841a1d0c8
commit 301da36697
9 changed files with 1470 additions and 346 deletions

View File

@ -0,0 +1,124 @@
package com.realnet.DataLake.Controllers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.realnet.DataLake.Entity.Data_lake;
import com.realnet.DataLake.Services.Data_lakeService;
import com.realnet.fnd.response.EntityResponse;
@RequestMapping(value = "/Data_lake")
@CrossOrigin("*")
@RestController
public class Data_lakeController {
@Autowired
private Data_lakeService Service;
@Value("${projectPath}")
private String projectPath;
@PostMapping("/Data_lake")
public Data_lake Savedata(@RequestBody Data_lake data) {
Data_lake save = Service.Savedata(data);
System.out.println("data saved..." + save);
return save;
}
@PutMapping("/Data_lake/{id}")
public Data_lake update(@RequestBody Data_lake data, @PathVariable Integer id) {
Data_lake update = Service.update(data, id);
System.out.println("data update..." + update);
return update;
}
// get all with pagination
@GetMapping("/Data_lake/getall/page")
public Page<Data_lake> getall(@RequestParam(value = "page", required = false) Integer page,
@RequestParam(value = "size", required = false) Integer size) {
Pageable paging = PageRequest.of(page, size);
Page<Data_lake> get = Service.getAllWithPagination(paging);
return get;
}
@GetMapping("/Data_lake")
public List<Data_lake> getdetails() {
List<Data_lake> get = Service.getdetails();
return get;
}
// get all without authentication
@GetMapping("/token/Data_lake")
public List<Data_lake> getallwioutsec() {
List<Data_lake> get = Service.getdetails();
return get;
}
@GetMapping("/Data_lake/{id}")
public Data_lake getdetailsbyId(@PathVariable Integer id) {
Data_lake get = Service.getdetailsbyId(id);
return get;
}
@DeleteMapping("/Data_lake/{id}")
public ResponseEntity<?> delete_by_id(@PathVariable Integer id) {
Service.delete_by_id(id);
return new ResponseEntity<>(new EntityResponse("Deleted"), HttpStatus.OK);
}
// json Updtaed
@PutMapping("/Data_lake/json/{id}")
public Data_lake update(@PathVariable Integer id) throws JsonProcessingException {
Data_lake update = Service.Updatejson(id);
System.out.println("json update..." + update);
return update;
}
@GetMapping("/Data_lake/merge/{id}")
public ResponseEntity<?> mergeBatchData(@PathVariable Integer id) {
try {
// Get merged JSON string from service
String mergedJson = Service.mergeBatchData(id);
// Parse the merged JSON string back into JSON structure
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(mergedJson);
// Return as actual JSON (not as string)
return ResponseEntity.ok(jsonNode);
} catch (Exception e) {
// Build error response manually (Map.of() not available in Java 8)
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage());
return ResponseEntity.status(500).body(error);
}
}
}

View File

@ -0,0 +1,57 @@
package com.realnet.DataLake.Controllers;
import javax.xml.bind.annotation.XmlRootElement;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
@RequestMapping("/token/api")
public class XmlApiExample {
@GetMapping(value = "/getUser", produces = MediaType.APPLICATION_XML_VALUE)
public User getUser() {
User user = new User();
user.setId(101);
user.setName("John Doe");
user.setEmail("john.doe@example.com");
return user;
}
// Dummy XML model class
@XmlRootElement(name = "user")
public static class User {
private int id;
private String name;
private String email;
// Getters and setters
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}
}

View File

@ -0,0 +1,115 @@
package com.realnet.DataLake.Controllers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.realnet.DataLake.Entity.Data_lake;
import com.realnet.DataLake.Services.Data_lakeService;
import com.realnet.fnd.response.EntityResponse;
@RequestMapping(value = "/token/Data_lake")
@CrossOrigin("*")
@RestController
public class tokenFree_Data_lakeController {
@Autowired
private Data_lakeService Service;
@Value("${projectPath}")
private String projectPath;
@PostMapping("/Data_lake")
public Data_lake Savedata(@RequestBody Data_lake data) {
Data_lake save = Service.Savedata(data);
System.out.println("data saved..." + save);
return save;
}
@PutMapping("/Data_lake/{id}")
public Data_lake update(@RequestBody Data_lake data, @PathVariable Integer id) {
Data_lake update = Service.update(data, id);
System.out.println("data update..." + update);
return update;
}
// get all with pagination
@GetMapping("/Data_lake/getall/page")
public Page<Data_lake> getall(@RequestParam(value = "page", required = false) Integer page,
@RequestParam(value = "size", required = false) Integer size) {
Pageable paging = PageRequest.of(page, size);
Page<Data_lake> get = Service.getAllWithPagination(paging);
return get;
}
@GetMapping("/Data_lake")
public List<Data_lake> getdetails() {
List<Data_lake> get = Service.getdetails();
return get;
}
// get all without authentication
@GetMapping("/token/Data_lake")
public List<Data_lake> getallwioutsec() {
List<Data_lake> get = Service.getdetails();
return get;
}
@GetMapping("/Data_lake/{id}")
public Data_lake getdetailsbyId(@PathVariable Integer id) {
Data_lake get = Service.getdetailsbyId(id);
return get;
}
@DeleteMapping("/Data_lake/{id}")
public ResponseEntity<?> delete_by_id(@PathVariable Integer id) {
Service.delete_by_id(id);
return new ResponseEntity<>(new EntityResponse("Deleted"), HttpStatus.OK);
}
@GetMapping("/Data_lake/merge/{id}")
public ResponseEntity<?> mergeBatchData(@PathVariable Integer id) {
try {
// Get merged JSON string from service
String mergedJson = Service.mergeBatchData(id);
// Parse the merged JSON string back into JSON structure
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(mergedJson);
// Return as actual JSON (not as string)
return ResponseEntity.ok(jsonNode);
} catch (Exception e) {
// Build error response manually (Map.of() not available in Java 8)
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage());
return ResponseEntity.status(500).body(error);
}
}
}

View File

@ -0,0 +1,26 @@
package com.realnet.DataLake.Entity;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Lob;
import lombok.Data;
@Entity
@Data
public class BatchData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private Integer datalake_id;
@Lob
@Column(columnDefinition = "TEXT")
private String batchjson;
}

View File

@ -0,0 +1,51 @@
package com.realnet.DataLake.Entity;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Lob;
import com.realnet.WhoColumn.Entity.Extension;
import lombok.Data;
@Entity
@Data
public class Data_lake extends Extension {
/**
*
*/
private static final long serialVersionUID = 1L;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
private String url;
private String schedule;
private String cron_job;
@Lob
@Column(columnDefinition = "TEXT")
private String json;
private Integer batch_volume;
private Integer sure_connect_id;
private String sureconnect_name;
private String url_endpoint;
@Lob
@Column(columnDefinition = "TEXT")
private String calculated_field_json;
private Boolean iscalculatedfield;
}

View File

@ -0,0 +1,24 @@
package com.realnet.DataLake.Repository;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import com.realnet.DataLake.Entity.BatchData;
@Repository
public interface BatchDataRepository extends JpaRepository<BatchData, Integer> {
@Query(value = "select * from batch_data where created_by=?1", nativeQuery = true)
List<BatchData> findAll(Long creayedBy);
@Query(value = "select * from batch_data where created_by=?1", nativeQuery = true)
Page<BatchData> findAll(Long creayedBy, Pageable page);
@Query(value = "select * from batch_data", nativeQuery = true)
List<BatchData> findByDatalake_id(Integer datalake_id);
}

View File

@ -0,0 +1,21 @@
package com.realnet.DataLake.Repository;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import com.realnet.DataLake.Entity.Data_lake;
@Repository
public interface Data_lakeRepository extends JpaRepository<Data_lake, Integer> {
@Query(value = "select * from data_lake where created_by=?1", nativeQuery = true)
List<Data_lake> findAll(Long creayedBy);
@Query(value = "select * from data_lake where created_by=?1", nativeQuery = true)
Page<Data_lake> findAll(Long creayedBy, Pageable page);
}

View File

@ -0,0 +1,428 @@
package com.realnet.DataLake.Services;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.realnet.DataLake.Entity.BatchData;
import com.realnet.DataLake.Entity.Data_lake;
import com.realnet.DataLake.Repository.BatchDataRepository;
import com.realnet.DataLake.Repository.Data_lakeRepository;
import com.realnet.SureConnect.Service.SureService;
import com.realnet.realm.Entity.Realm;
import com.realnet.realm.Services.RealmService;
import com.realnet.users.entity1.AppUser;
import com.realnet.users.service1.AppUserServiceImpl;
import com.realnet.utils.Port_Constant;
@Service
public class Data_lakeService {
@Autowired
private Data_lakeRepository Repository;
@Autowired
private BatchDataRepository batchRepo;
@Autowired
private AppUserServiceImpl userService;
@Autowired
private RealmService realmService;
@Autowired
private SureService sureService;
public Data_lake Savedata(Data_lake data) {
data.setUpdatedBy(getUser().getUserId());
data.setCreatedBy(getUser().getUserId());
data.setAccountId(getUser().getAccount().getAccount_id());
if (data.getSure_connect_id() != null) {
data.setSureconnect_name(sureService.getbyid(data.getSure_connect_id()).getConnection_name());
}
Data_lake save = Repository.save(data);
return save;
}
// get all with pagination
public Page<Data_lake> getAllWithPagination(Pageable page) {
return Repository.findAll(getUser().getUserId(), page);
}
public List<Data_lake> getdetails() {
List<Realm> realm = realmService.findByUserId(getUser().getUserId());
List<Data_lake> all = Repository.findAll(getUser().getUserId());
return all;
}
public Data_lake getdetailsbyId(Integer id) {
return Repository.findById(id).get();
}
public void delete_by_id(Integer id) {
Repository.deleteById(id);
}
public Data_lake update(Data_lake data, Integer id) {
Data_lake old = Repository.findById(id).get();
// id auto-generated hai update nahi karenge
if (data.getName() != null) {
old.setName(data.getName());
}
if (data.getUrl() != null) {
old.setUrl(data.getUrl());
}
if (data.getSchedule() != null) {
old.setSchedule(data.getSchedule());
}
if (data.getCron_job() != null) {
old.setCron_job(data.getCron_job());
}
if (data.getJson() != null) {
old.setJson(data.getJson());
}
if (data.getBatch_volume() != null) {
old.setBatch_volume(data.getBatch_volume());
}
if (data.getSure_connect_id() != null) {
old.setSure_connect_id(data.getSure_connect_id());
old.setSureconnect_name(sureService.getbyid(data.getSure_connect_id()).getConnection_name());
}
if (data.getUrl_endpoint() != null) {
old.setUrl_endpoint(data.getUrl_endpoint());
}
if (data.getCalculated_field_json() != null) {
old.setCalculated_field_json(data.getCalculated_field_json());
}
if (data.getIscalculatedfield() != null) {
old.setIscalculatedfield(data.getIscalculatedfield());
}
final Data_lake test = Repository.save(old);
return test;
}
public Data_lake Updatejson(Integer id) throws JsonProcessingException {
Data_lake old = Repository.findById(id).get();
String url = old.getUrl();
ResponseEntity<String> response = GETAsString(url);
// Convert the JSON object (ArrayList, Map, etc.) to a String
// Object responseBody = response.getBody();
ObjectMapper mapper = new ObjectMapper();
String rawBody = response.getBody();
// String jsonString = mapper.writeValueAsString(rawBody);
ObjectMapper jsonMapper = new ObjectMapper();
Object responseBody;
// Detect and convert if XML
if (isXml(rawBody)) {
try {
// Convert XML JSON tree
XmlMapper xmlMapper = new XmlMapper();
JsonNode xmlNode = xmlMapper.readTree(rawBody.getBytes());
// Convert to standard JSON structure
String jsonFromXml = jsonMapper.writeValueAsString(xmlNode);
responseBody = jsonMapper.readValue(jsonFromXml, Object.class);
System.out.println("XML detected and converted to JSON successfully.");
} catch (Exception e) {
throw new RuntimeException("Failed to convert XML to JSON: " + e.getMessage());
}
} else {
// Normal JSON response
try {
responseBody = jsonMapper.readValue(rawBody, Object.class);
System.out.println("JSON response detected.");
} catch (Exception e) {
throw new RuntimeException("Invalid JSON format: " + e.getMessage());
}
}
// Handle calculated fields before batch processing
if (Boolean.TRUE.equals(old.getIscalculatedfield()) && old.getCalculated_field_json() != null) {
try {
responseBody = applyCalculatedFields(responseBody, old.getCalculated_field_json(), mapper);
System.out.println("Calculated fields applied successfully.");
} catch (Exception e) {
System.err.println("Failed to process calculated fields: " + e.getMessage());
}
}
String jsonString = mapper.writeValueAsString(responseBody);
old.setJson(jsonString);
// Process and insert into BatchData as before
// Process and insert into BatchData
processBatchData(old, responseBody, mapper);
String Url = Port_Constant.DOMAIN + "/Data_lake/Data_lake/merge/" + old.getId();
old.setUrl_endpoint(Url);
old.setUpdatedBy(getUser().getUserId());
final Data_lake saved = Repository.save(old);
System.out.println(" json updated..");
return saved;
}
@SuppressWarnings("unchecked")
private Object applyCalculatedFields(Object responseBody, String calculatedFieldJson, ObjectMapper mapper)
throws JsonProcessingException {
// Parse the calculated field JSON
List<Map<String, Object>> calcFields = mapper.readValue(calculatedFieldJson, List.class);
if (!(responseBody instanceof List)) {
// Wrap single object into a list for consistent processing
responseBody = Arrays.asList(responseBody);
}
List<Map<String, Object>> records = (List<Map<String, Object>>) responseBody;
for (Map<String, Object> record : records) {
for (Map<String, Object> calc : calcFields) {
String fieldName = (String) calc.get("fieldName");
String operation = (String) calc.get("operation");
List<Map<String, Object>> components = (List<Map<String, Object>>) calc.get("fieldComponents");
// Create constant fields separately
for (Map<String, Object> comp : components) {
String subField = (String) comp.get("field");
Boolean isConstant = comp.get("isConstant") != null && (Boolean) comp.get("isConstant");
Object constantValue = comp.get("constant");
if (isConstant && subField != null) {
record.put(subField, parseNumberOrString(constantValue));
}
}
// Collect operand values
List<Object> values = new ArrayList<>();
for (Map<String, Object> comp : components) {
Boolean isConstant = comp.get("isConstant") != null && (Boolean) comp.get("isConstant");
Object val = isConstant ? comp.get("constant") : record.get(comp.get("field"));
if (val != null) {
values.add(val);
}
}
// Compute final value
Object result = performOperation(values, operation);
record.put(fieldName, result);
}
}
return records;
}
private Object performOperation(List<Object> values, String operation) {
if (values.isEmpty())
return null;
switch (operation.toLowerCase()) {
case "add":
double sum = 0;
for (Object v : values)
sum += toDouble(v);
return sum;
case "subtract":
double result = toDouble(values.get(0));
for (int i = 1; i < values.size(); i++)
result -= toDouble(values.get(i));
return result;
case "multiply":
double prod = 1;
for (Object v : values)
prod *= toDouble(v);
return prod;
case "divide":
double div = toDouble(values.get(0));
for (int i = 1; i < values.size(); i++) {
double val = toDouble(values.get(i));
if (val != 0)
div /= val;
}
return div;
case "percentage":
if (values.size() < 2)
return null;
double num = toDouble(values.get(0));
double den = toDouble(values.get(1));
return den == 0 ? null : (num / den) * 100;
case "concat":
return values.stream().map(Object::toString).collect(Collectors.joining("_"));
default:
return null;
}
}
private double toDouble(Object val) {
if (val == null)
return 0.0;
try {
return Double.parseDouble(val.toString().trim());
} catch (NumberFormatException e) {
return 0.0;
}
}
private Object parseNumberOrString(Object val) {
if (val == null)
return null;
String str = val.toString().trim();
try {
return Double.parseDouble(str);
} catch (NumberFormatException e) {
return str;
}
}
private boolean isXml(String content) {
if (content == null)
return false;
String trimmed = content.trim();
// XML usually starts with '<' and ends with '>'
return trimmed.startsWith("<") && trimmed.endsWith(">");
}
private void processBatchData(Data_lake dataLake, Object responseBody, ObjectMapper mapper)
throws JsonProcessingException {
int batchVolume = (dataLake.getBatch_volume() != null && dataLake.getBatch_volume() > 0)
? dataLake.getBatch_volume()
: 100; // default batch size if not given
// Convert to JsonNode
JsonNode jsonNode = mapper.valueToTree(responseBody);
if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;
int total = arrayNode.size();
System.out.println("Total records: " + total);
for (int i = 0; i < total; i += batchVolume) {
int end = Math.min(i + batchVolume, total);
// Create a sub-array manually
ArrayNode subArray = mapper.createArrayNode();
for (int j = i; j < end; j++) {
subArray.add(arrayNode.get(j));
}
String subJson = mapper.writeValueAsString(subArray);
BatchData batch = new BatchData();
batch.setDatalake_id(dataLake.getId());
batch.setBatchjson(subJson);
batchRepo.save(batch);
}
System.out.println("Inserted " + (int) Math.ceil((double) total / batchVolume) + " batch records.");
} else {
// Single object one record
BatchData batch = new BatchData();
batch.setDatalake_id(dataLake.getId());
batch.setBatchjson(mapper.writeValueAsString(jsonNode));
batchRepo.save(batch);
System.out.println("Inserted single batch record.");
}
}
public String mergeBatchData(Integer datalakeId) throws Exception {
List<BatchData> batchList = batchRepo.findByDatalake_id(datalakeId);
if (batchList.isEmpty()) {
throw new RuntimeException("No batch data found for datalake_id: " + datalakeId);
}
ObjectMapper mapper = new ObjectMapper();
ArrayNode mergedArray = mapper.createArrayNode();
for (BatchData batch : batchList) {
String json = batch.getBatchjson();
JsonNode node = mapper.readTree(json);
if (node.isArray()) {
// Add each element to merged array
for (JsonNode item : node) {
mergedArray.add(item);
}
} else {
// Single object, just add directly
mergedArray.add(node);
}
}
String mergedJson = mapper.writeValueAsString(mergedArray);
System.out.println("Merged JSON size: " + mergedArray.size());
return mergedJson;
}
public ResponseEntity<Object> GET(String get) {
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<Object> u = restTemplate.getForEntity(get, Object.class);
return u;
}
public ResponseEntity<String> GETAsString(String url) {
RestTemplate restTemplate = new RestTemplate();
return restTemplate.getForEntity(url, String.class);
}
public AppUser getUser() {
AppUser user = userService.getLoggedInUser();
return user;
}
}